package com.jfinal.plugin.zbus.proto;

import com.jfinal.log.Logger;
import org.zbus.broker.Broker;
import org.zbus.mq.Producer;
import org.zbus.mq.Protocol;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.google.common.collect.Maps.newConcurrentMap;

/**
 * zbus的生产者对象,主要包含操作:
 * init,create,go,register
 * Created by dairymix on 17/5/3.
 */
public class ZbusProducer {

    private static final Logger log = Logger.getLogger(ZbusProducer.class);

    public Broker broker;
    String mqName;
    Protocol.MqMode mqMode;

    public Map<Integer,Producer> producerMap = newConcurrentMap();
    Integer producer_max_num;

    public ZbusProducer broker(Broker broker){
        if (broker != null){
            this.broker = broker;
        } else {
            this.broker = new ZbusBroker().doing().me;
        }
        return this;
    }

    public ZbusProducer broker(){
        return broker(null);
    }

    public ZbusProducer init(String mqName, Protocol.MqMode mqMode){
        this.mqName = mqName;
        this.mqMode = mqMode;
        return this;
    }

    public ZbusProducer maxProducerNum(Integer producerNum){
        this.producer_max_num = producerNum;
        return this;
    }

    public String mqName(){
        return this.mqName;
    }

    public Integer maxProducerNum(){ //默认1个,暂时不支持空设
        return (producer_max_num != null && producer_max_num >0)?producer_max_num:1;
    }

    /*****************************************/

    /**
     * @ClassName: create
     * @Description: 创建一个Producer
     * @since V1.0.0
     */
    public ZbusProducer create(Broker broker, String mqName, Protocol.MqMode mqMode,Integer producer_num){
        for (int i = 0; i < producer_num; i++) {
            producerMap.put(i,new Producer(broker, mqName, mqMode));
            register(producerMap);
        }
        log.info("创建MQ生产者成功(mq=" + mqName + ")");
        return this;
    }

    public ZbusProducer create(Broker broker, String mqName, Protocol.MqMode mqMode){
        return create(broker,mqName,mqMode,maxProducerNum());
    }

    public ZbusProducer create(Integer producer_num){
        return create(broker,mqName,mqMode,producer_num);
    }

    public ZbusProducer create(){
        return create(broker,mqName,mqMode,maxProducerNum());
    }

    //批量处理业务流程
    public ZbusProducer go(Map<Integer,Producer> producerMap){
        if (producerMap != null){
            producerMap.forEach((k,v)->{
                try {
                    v.createMQ();//创建producer的mq
                } catch (IOException|InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        return this;
    }

    public ZbusProducer go(){
        return go(producerMap);
    }

    //业务流程批处理
    public ZbusProducer doing(Broker broker, String mqName, Protocol.MqMode mqMode, Integer producer_num){
        return new ZbusProducer()
                .broker(broker)
                .init(mqName,mqMode)
                .maxProducerNum(producer_num)
                .create(producer_num)
                .go();
    }

    public ZbusProducer doing(String mqName, Protocol.MqMode mqMode, Integer producer_num){
        return doing(null,mqName,mqMode,producer_num);
    }

    public ZbusProducer doing(Broker broker, String mqName, Protocol.MqMode mqMode){
        return doing(broker,mqName,mqMode,maxProducerNum());
    }

    public ZbusProducer doing(String mqName, Protocol.MqMode mqMode){
        return doing(null,mqName,mqMode);
    }

    //将producer重新设定为null，重新获取producer对象
    public ZbusProducer close(){
        producerMap.clear();
        return this;
    }

    /*****************************************/

    public ZbusProducer ensureProducer(Broker broker,Integer producer_max_num) throws IOException, InterruptedException{
        if(producerMap == null || producerMap.isEmpty()){
            maxProducerNum(producer_max_num);
            synchronized (this) {
                //创建生产者
                if (broker != null) {
                    producerMap = createProducer(broker, mqName, mqMode);
                } else {
                    producerMap = createProducer(mqName, mqMode);
                }
            }
        }
        return this;
    }

    public ZbusProducer ensureProducer(Integer producer_max_num) throws IOException, InterruptedException{
        return ensureProducer(broker,producer_max_num);
    }

    public ZbusProducer ensureProducer() throws IOException, InterruptedException{
        return ensureProducer(maxProducerNum());
    }

    /*****************************************/

    public static ZbusProducer createZbusProducer(Broker broker, String mqName, Protocol.MqMode mqMode){
        return new ZbusProducer().doing(broker,mqName,mqMode);
    }

    public static ZbusProducer createZbusProducer(String mqName, Protocol.MqMode mqMode){
        return createZbusProducer(null,mqName,mqMode);
    }

    public static Map<Integer,Producer> createProducer(Broker broker, String mqName, Protocol.MqMode mqMode){
        return createZbusProducer(broker,mqName,mqMode).producerMap;
    }

    public static Map<Integer,Producer> createProducer(String mqName, Protocol.MqMode mqMode){
        return createProducer(null,mqName,mqMode);
    }

    public static Producer createSingleProducer(Broker broker, String mqName, Protocol.MqMode mqMode){
        return createProducer(broker,mqName,mqMode).get(0);
    }

    public static Producer createSingleProducer(String mqName, Protocol.MqMode mqMode){
        return createSingleProducer(null,mqName,mqMode);
    }

    /******************************/

    /**
     * 生产者列表
     */
    public final List<Map<Integer,Producer>> producerMapList =  new ArrayList<>();

    public ZbusProducer register(Map<Integer,Producer> producerMap){
        if (producerMap != null) {
            producerMapList.add(producerMap);//注册这个生产者
        }
        return this;
    }

}
